#!/usr/bin/env python

import os
import sys
import time
import shutil
import struct
import sqlite3
import base64
import socket
import shelve
import threading
import paramiko
from stat import S_ISDIR
from optparse import OptionParser

__DB_HASH__ = 10
PREFIX = '/opt/fusionstack/data/disk/bitmap/'
DBPREFIX = '/opt/fusionstack/data/chunk/'

chunktype = ['null', 'pool', 'subpool', 'vol', 'subvol', 'raw']

hostname = socket.gethostname()
WORKDIR = os.path.join('/tmp/dbdump')

def db_iterater(dbid):
    conn = sqlite3.connect(DBPREFIX + "%d.db"%dbid)
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    cursor.execute('select * from raw')
    while True:
        item = cursor.fetchone()
        if item:
            chkid = struct.unpack('QII', base64.b64decode(item['key']))
            yield {'vol':chkid[0], 'id': "%s.%d.%d" % (chunktype[chkid[1]], chkid[0], chkid[2]), \
                    'disk':item['disk'], 'offset':item['offset']}
        else:
            break

    cursor.close()
    conn.close()

def db_dump_th(dbidx, vol_files, lock):
    lock.acquire()
    print '-- dbidx:%d' % (dbidx, )
    lock.release()

    for item in db_iterater(dbidx):
        chkid = item['id']
        vol = item['vol']
        disk = item['disk']
        offset = item['offset']

        lock.acquire()
        if not vol_files.has_key(vol):
            sle = shelve.open('%s/%d.db' % (WORKDIR, vol, ), writeback=True)
            chklock = threading.Lock()
            vol_files[vol] = {'db':sle, 'lock':lock}
            vol_files[vol]['db']['raw'] = []
        lock.release()

        chklock = vol_files[vol]['lock']

        chklock.acquire()
        vol_files[vol]['db']['raw'].append(chkid.split('.')[-1])
        chklock.release()

def db_dump():
    th = []
    vol_files = {}
    lock = threading.Lock()

    for i in range(0, __DB_HASH__):
        th.append(threading.Thread(target=db_dump_th, args=(i, vol_files, lock)))

    for t in th:
        t.start()

    for t in th:
        t.join()

    print "collect finish"

    for k, v in vol_files.items():
        v['db'].close()

class Exp(Exception):
    def __init__(self, errno, err, out = None):
        self.errno = errno
        self.err = err
        self.out = out

    def __str__(self):
        exp_info = 'errno:%s, err:%s'%(self.errno, self.err)
        if self.out is not None:
            exp_info += ' stdout:' + self.out
        return repr(exp_info)

def _session_recv(session):
    try:
        data = session.recv(4096)
    except socket.timeout as err:
        data = ""

    return data

def _session_recv_stderr(session):
    try:
        data = session.recv_stderr(4096)
    except socket.timeout as err:
        data = ""

    return data

def _exec_remote2(host, cmd, user = "root", password=None, timeout = 10, exectimeout=28800):
    stdout = ""
    stderr = ""
    status = 0

    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect(host, 22, user, password, timeout = timeout)
        transport = client.get_transport()
        session = transport.open_channel(kind='session')
        session.settimeout(3)
        session.exec_command(cmd)

        now1 = time.time()
        while True:
            if session.recv_ready():
                data = _session_recv(session)
                stdout = stdout + data

            if session.recv_stderr_ready():
                data = _session_recv_stderr(session)
                stderr = stderr + data

            if session.exit_status_ready():
                while True:
                    data = _session_recv(session)
                    if data == "":
                        break
                    stdout = stdout + data

                while True:
                    data = _session_recv_stderr(session)
                    if data == "":
                        break
                    stderr = stderr + data

                break

            now2 = time.time()
            if (now2 - now1) > exectimeout:
                raise Exp(errno.ETIMEDOUT, "timeout, now1: %s, now2: %s, exectimeout: %s"%(now1, now2, exectimeout))

        status = session.recv_exit_status()

    except socket.timeout as err:
        raise Exp(err.errno, 'Socket timeout')
    except socket.error as err:
        raise Exp(err.errno, err.strerror)
    except paramiko.AuthenticationException as err:
        raise Exp(250, 'Authentication failed')

    session.close()
    client.close()
    return stdout, stderr, status

def _exec_remote1(host, cmd, user = "root", password=None, timeout = 10, exectimeout=28800):
    str1, str2, status =  _exec_remote2(host, cmd, user, password, timeout, exectimeout)
    if status != 0:
        raise Exp(status, str1 + " " + str2)
    return str1, str2

def _list_remote(host, remote, user = "root", password='password', timeout = 10):
    s = paramiko.SSHClient()
    s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        s.connect(host, 22, user, password, timeout = timeout)
        f = s.open_sftp()
        files = f.listdir_attr(remote)
    except socket.timeout as err:
        raise Exp(err.errno, err.strerror)
    except socket.error as err:
        raise Exp(err.errno, err.strerror)
    except paramiko.AuthenticationException as err:
        raise Exp(250, 'Authentication failed')
    except IOError, e:
        raise Exp(e.errno, e.strerror)
    f.close()
    s.close()
    return [ f.filename for f in files if not S_ISDIR(f.st_mode)]

def _get_remote(host, remote, local, user = "root", password='password', timeout = 10):
    s = paramiko.SSHClient()
    s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        s.connect(host, 22, user, password, timeout = timeout)
        f = s.open_sftp()
        f.get(remote, local)
        f.remove(remote)
    except socket.timeout as err:
        raise Exp(err.errno, err.strerror)
    except socket.error as err:
        raise Exp(err.errno, err.strerror)
    except paramiko.AuthenticationException as err:
        raise Exp(250, 'Authentication failed')
    except IOError, e:
        raise Exp(e.errno, e.strerror)
    f.close()
    s.close()

def _put_remote(host, local, remote, user = "root", password='password', timeout = 10):
    s = paramiko.SSHClient()
    s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        s.connect(host, 22, user, password, timeout = timeout)
        f = s.open_sftp()
        f.put(local, remote)
    except socket.timeout as err:
        raise Exp(err.errno, err.strerror)
    except socket.error as err:
        raise Exp(err.errno, err.strerror)
    except paramiko.AuthenticationException as err:
        raise Exp(250, 'Authentication failed')
    except IOError, e:
        raise Exp(e.errno, e.strerror)
    f.close()
    s.close()

def _mkdir_remote(host, path, user = "root", password='password', timeout = 10):
    s = paramiko.SSHClient()
    s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        s.connect(host, 22, user, password, timeout = timeout)
        f = s.open_sftp()
        f.mkdir(path)
    except socket.timeout as err:
        raise Exp(err.errno, err.strerror)
    except socket.error as err:
        raise Exp(err.errno, err.strerror)
    except paramiko.AuthenticationException as err:
        raise Exp(250, 'Authentication failed')
    except IOError, e:
        pass
    f.close()
    s.close()

def _dir_sync(host, remote, local, user = "root", password='password', timeout = 10):
    files = _list_remote(host, remote, user, password, timeout)
    for f in files:
        _get_remote(host, os.path.join(remote, f), os.path.join(local, f), user, password, timeout)

def _rack_get(hostname):
     split = hostname.split('.')
     if (len(split) == 3):
        return split[1]
     else:
        return 'default'

def db_analysis_th(host, nid, lock):

    lock.acquire()
    print '--dbdump %s' % (host, )
    lock.release()

    pydir = os.path.join(WORKDIR, os.path.split(__file__)[-1])
    _mkdir_remote(host, WORKDIR)
    _put_remote(host, os.path.abspath(__file__), pydir)

    cmd = 'python %s --dbdump' % (pydir, )
    _exec_remote1(host, cmd)

    workdir = os.path.join(WORKDIR, host)
    if not os.path.exists(workdir):
        os.mkdir(workdir)

    _dir_sync(host, WORKDIR, os.path.join(WORKDIR, host))

def vol_analysis(volid, repnum, n2h, h2n):
    volume = {}
    print 'analysis volid %s' % (volid, )
    for nid , host in n2h.items():
        obj = os.path.join(WORKDIR, host, volid+'.db')
        if os.path.exists(obj):
            volume[host] = shelve.open(obj)['raw']
    raw = {}
    for k, v in volume.items():
        nid = h2n[k]
        for chkid in v:
            if not raw.has_key(chkid):
                raw[chkid] = []
            if nid not in raw[chkid]:
                raw[chkid].append(nid)

    volume.clear()
    print 'vol: %s raw: %d' % (volid, len(raw.keys()), )
    for rawid, rep in raw.items():
        rack = []
        if (len(rep) != repnum):
            print 'ERROR repnum raw: %s rep %d %s' % (rawid, len(rep), str(rep))

        for r in rep:
            rack.append(_rack_get(n2h[r]))

        if (len(rep) <= repnum):
            if len(set(rack)) != len(rep):
                hosts = []
                for i in rep:
                    hosts.append(n2h[i])
                print 'ERROR rack raw: %s rep %d [%s]' % (rawid, len(rep), ','.join((hosts)))

def db_analysis(repnum=3):
    # pip install python-etcd
    import etcd
    th = []
    n2h = {}
    h2n = {}
    client = etcd.Client(port=2379)
    rst = client.get('/lich4/node')

    lock = threading.Lock()
    for hst in rst.children:
        host = hst.key.split('/')[-1]
        nid = int(hst.value)
        n2h[nid] = host
        h2n[host] = nid
        th.append(threading.Thread(target=db_analysis_th, args=(host, nid, lock)))

    for t in th:
        t.start()

    for t in th:
        t.join()

    print 'begin analysis'
    tmp = []
    for root, dirs, files in os.walk(WORKDIR):
        for dir in dirs:
            for r, d, f in os.walk(os.path.join(root, dir)):
                for i in f:
                    tmp.append(i.split('.')[0])

    chkid = list(set(tmp))

    for vol in chkid:
        vol_analysis(vol, repnum, n2h, h2n)

def main():

    parser = OptionParser()

    parser.add_option("-d", "--dbdump", action='store_true', default=False,
            dest="dbdump", help="--dbdump")
    parser.add_option("-r", "--repcheck", nargs=1, type='int', default=False,
            dest="repcheck", help="--repcheck default_repnum")

    (options, args) = parser.parse_args()

    if options.dbdump:
        if not os.path.exists(WORKDIR):
            os.mkdir(WORKDIR)
        else:
            shutil.rmtree(WORKDIR)
            os.mkdir(WORKDIR)

        db_dump()

    if options.repcheck:
        db_analysis(options.repcheck)

if __name__ == '__main__':
    main()


